﻿using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace CRL.RabbitMQ
{
    //安装插件 rabbitmq-plugins enable rabbitmq_delayed_message_exchange rabbitmq版本需要3.9以上
    //https://www.cnblogs.com/chrischennx/p/7274556.html
    //https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
    public class DelayRabbitMQ: AbsRabbitMQ
    {
        //string MqExchangeType = "x-delayed-message";
        protected override string MqExchangeType => "x-delayed-message";

        public DelayRabbitMQ(ConnectionConfig config, string exchangeName) : base(config)
        {
            __exchangeName = exchangeName;
            Log($"{MqExchangeType}队列:初始化");
        }
        //protected override Action<IModel, string, string> ExchangeDeclare => (channel, exchangeName, MqExchangeType) =>
        //{
        //    var args = new Dictionary<string, object>();
        //    args.Add("x-delayed-type", "direct");
        //    channel.ExchangeDeclare(exchangeName, this.MqExchangeType, true, false, args);
        //};
        void initOption(ref ConsumeOption option)
        {
            option = option ?? new ConsumeOption();
            option.ExchangeDeclareArgs = option.ExchangeDeclareArgs ?? new Dictionary<string, object>();
            option.ExchangeDeclareArgs.Add("x-delayed-type", "direct");
        }
        public void Publish<T>(string routingKey, int delayMs, params T[] msgs)
        {
            BasePublish(routingKey, b =>
            {
                b.Headers = b.Headers ?? new Dictionary<string, object>();
                b.Headers.Add("x-delay", delayMs);
            }, msgs);
        }
        public void Publish<T>(string routingKey, int delayMs, Action<IBasicProperties> basicPropertiesFunc, params T[] msgs)
        {
            Action<IBasicProperties> basicPropertiesFunc2 = b =>
            {
                b.Headers = b.Headers ?? new Dictionary<string, object>();
                b.Headers.Add("x-delay", delayMs);
             };
            BasePublish(routingKey, basicPropertiesFunc + basicPropertiesFunc2, msgs);
        }
        public void BeginReceive<T>(string queueName, string routingKey, Action<T, string> onReceive, ConsumeOption option = null)
        {
            initOption(ref option);
            var channel = CreateConsumerChannel();
            //channel.QueueDeclare(queueName, true, false, false, null);
            QueueDeclare(channel, queueName, option);
            ExchangeDeclare(channel, option);
            QueueBind(channel, queueName, routingKey, option);
            Log($"开始消费,类型:Delay 队列:{queueName} Key:{routingKey}");
            base.BaseBeginConsumer(channel, queueName, onReceive, option);
        }

        public void BeginReceive(string queueName, string routingKey, Action<byte[], string> onReceive, ConsumeOption option = null)
        {
            initOption(ref option);
            var channel = CreateConsumerChannel();
            //channel.QueueDeclare(queueName, true, false, false, null);
            QueueDeclare(channel, queueName, option);
            ExchangeDeclare(channel, option);
            QueueBind(channel, queueName, routingKey, option);
            Log($"开始消费,类型:Delay 队列:{queueName} Key:{routingKey}");
            base.BaseBeginConsumerString(channel, queueName, onReceive, option);
        }
        public void BeginReceiveAsync(string queueName, string routingKey, Func<byte[], string, Task> onReceive, ConsumeOption option = null)
        {
            initOption(ref option);
            var channel = CreateConsumerChannel();

            //channel.QueueDeclare(queueName, true, false, false, null);
            QueueDeclare(channel, queueName, option);
            ExchangeDeclare(channel, option);
            QueueBind(channel, queueName, routingKey, option);
            Log($"开始消费,类型:Delay 队列:{queueName} Key:{routingKey}");
            base.BaseBeginConsumerAsync(channel, queueName, onReceive, option);
        }
    }
}
